/**
 * Copyright 2019 吉鼎科技.
 *
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package cn.easyplatform.spi.mqtt;

import cn.easyplatform.EasyPlatformRuntimeException;
import cn.easyplatform.spi.annotation.Plugin;
import cn.easyplatform.spi.extension.ApplicationContext;
import cn.easyplatform.spi.extension.ApplicationService;
import cn.easyplatform.type.StateType;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

/**
 * mqtt客户端接收信息和下发指令
 *
 * @author <a href="mailto:davidchen@epclouds.com">littleDog</a> <br/>
 * @since 2.0.0 <br/>
 */
@Plugin(config = "easyplatform-mqtt.conf")
public class MqttService implements ApplicationService, MqttCallback {

    protected final static Logger log = LoggerFactory.getLogger(MqttService.class);

    private ApplicationContext ctx;

    private MqttClient client;

    // 启动时间
    private Date startTime;

    private int state = StateType.STOP;

    /**
     * 构造器，必须有ApplicationContext输入参数
     *
     * @param ctx
     */
    public MqttService(ApplicationContext ctx) {
        this.ctx = ctx;
    }

    /**
     * 启动
     */
    public void start() {
        try {
            MemoryPersistence persistence = new MemoryPersistence();
            client = new MqttClient(ctx.getConfig("broker"), ctx.getConfig("clientId"), persistence);
            MqttConnectOptions options = new MqttConnectOptions();
            options.setUserName(ctx.getConfig("userName"));
            options.setPassword(ctx.getConfig("password").toCharArray());
            options.setCleanSession(true);
            client.setCallback(this);
            client.connect(options);
            state = StateType.START;
            startTime = new Date();
        } catch (Exception ex) {
            if (log.isErrorEnabled())
                log.error("start", ex);
            throw new EasyPlatformRuntimeException("Could not create mqtt connection", ex);
        }
    }

    /**
     * 停止服务
     */
    public void stop() {
        if (state == StateType.START) {
            try {
                client.disconnect();
            } catch (MqttException e) {
                if (log.isErrorEnabled())
                    log.error("stop", e);
            }
        }
        state = StateType.STOP;
    }

    /**
     * 服务状态
     *
     * @param state
     */
    public void setState(int state) {
        this.state = state;
    }

    /**
     * 服务id，必须是全局唯一，且必须保证在调用的逻辑中没有栏位或变量名称没有重复的
     *
     * @return
     */
    public String getId() {
        return "IotService";
    }

    public String getName() {
        return "电梯监控";
    }

    public String getDescription() {
        return "电梯监控";
    }

    public Date getStartTime() {
        return startTime;
    }

    /**
     * 运行时信息
     *
     * @return
     */
    public Map getRuntimeInfo() {
        //在这里可以增加一些信息，例如接收到几条，发送几条，成功多少，失败多少，以便在管理控制台显现
        return Collections.emptyMap();
    }

    /**
     * 服务状态
     *
     * @return
     */
    public int getState() {
        return state;
    }

    /**
     * 允许在逻辑中被调用，使用#id表示当前实例，#id.getState()表示获取当前服务状态
     *
     * @return
     */
    public boolean isCallable() {
        return true;
    }

    /**
     * 连接丢失的处理
     *
     * @param throwable
     */
    @Override
    public void connectionLost(Throwable throwable) {
        if (log.isErrorEnabled())
            log.error("connectionLost", throwable);
        try {
            client.reconnect();
        } catch (MqttException e) {
            if (log.isErrorEnabled())
                log.error("reconnect", throwable);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException interruptedException) {
            }
        }
    }

    /**
     * 发布指令
     *
     * @param msg
     */
    public void publish(Object msg) {
        publish(ctx.getConfig("pubTopic"), msg, NumberUtils.toInt(ctx.getConfig("qos")), false);
    }

    /**
     * 发送指令
     *
     * @param topic
     * @param msg
     * @param qos
     * @param retained
     */
    public void publish(String topic, Object msg, int qos, boolean retained) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        try {
            if (msg instanceof String)
                message.setPayload(((String) msg).getBytes("utf-8"));
            else if (msg instanceof byte[])
                message.setPayload((byte[]) msg);
            else if (msg instanceof InputStream)
                message.setPayload(IOUtils.toByteArray((InputStream) msg));
            else throw new EasyPlatformRuntimeException("Unsupported yet");
            client.publish(topic, message);
        } catch (MqttException e) {
            throw new EasyPlatformRuntimeException(e);
        } catch (IOException e) {
            throw new EasyPlatformRuntimeException(e);
        }
    }

    /**
     * 接收消息
     *
     * @param topic
     * @param mqttMessage
     * @throws Exception
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        if (log.isDebugEnabled())
            log.debug(String.format(" messageArrived:%s,%d", topic, mqttMessage.getQos()));
        Map<String, Object> data = new HashMap<>(4);
        data.put("topic", topic);
        data.put("qos", mqttMessage.getQos());
        data.put("payload", mqttMessage.getPayload());
        data.put("msgId", mqttMessage.getId());
        ctx.getProjectContext(ctx.getConfig("projectId")).asyncLogic(this, ctx.getConfig("logicId"), data);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

    }
}